package com.youxin.infra.sink;

import com.youxin.infra.entity.ItemSink;
import com.youxin.infra.util.KafkaProduUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class KafkaSink extends RichSinkFunction<ItemSink> {

    KafkaProduUtil kafkaProduUtil;

    @Override
    public void open(Configuration parameters) throws Exception {

        kafkaProduUtil = KafkaProduUtil.getInstance();
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void invoke(ItemSink value, Context context) throws Exception {
        kafkaProduUtil.sendMess(value.toString());
    }
}
